import time
from kafka import KafkaProducer

# 配置 Kafka 生产者
producer = KafkaProducer(bootstrap_servers='localhost:9092')  # 修改为你的 Kafka 地址
topic_name = 'test'  # 修改为你的主题名

def send_logs_to_kafka(log_file):
    # 打开日志文件并移动到文件末尾
    with open(log_file, 'r') as file:
        file.seek(0, 2)  # 移动到文件末尾

        while True:
            # 读取新行
            line = file.readline()
            if not line:  # 如果没有新行则等待一会
                time.sleep(1)  # 休眠 1 秒再检查
            else:
                # 发送每一行日志到 Kafka
                producer.send(topic_name, value=line.encode('utf-8'))  # 将字符串编码为字节
                producer.flush()  # 确保消息发送出去

if __name__ == "__main__":
    log_file_path = '/opt/user_behavior_logs.log'  # 替换为你的日志文件路径
    try:
        send_logs_to_kafka(log_file_path)
    except Exception as e:
        print(f"An error occurred: {e}")
    finally:
        producer.close()